Rework Local Activity scheduling#1507
Conversation
c0dd625 to
3d2cfa2
Compare
644f23a to
777486e
Compare
04e9496 to
8fdcec1
Compare
dce5bc6 to
1189151
Compare
7d8607a to
a8a4b72
Compare
| String type = | ||
| e instanceof ApplicationFailure | ||
| ? ((ApplicationFailure) e).getType() | ||
| : e.getClass().getName(); |
There was a problem hiding this comment.
Is it possible to set an explicit do not retry field in Java or is it only the string matching?
There was a problem hiding this comment.
String matching as it gets converted to protobuf's RetryPolicy, at least at this moment.
| @@ -286,10 +285,10 @@ private void processLocalActivityRequests(long startTimeNs) throws InterruptedEx | |||
| // much sense. I believe we should add ScheduleToStart timeout for the local activities | |||
| // as well. | |||
| localActivityTaskPoller.apply( | ||
| new LocalActivityTask(laRequest, localActivityCompletionSink), | ||
| Duration.ofNanos(maxWaitTimeNs)); | ||
| laRequest.setScheduleToStartTimeout(Duration.ofNanos(maxWaitTimeNs)); |
There was a problem hiding this comment.
Can user not set sched-to-start timeout? (Honestly, makes sense)
There was a problem hiding this comment.
It's old logic. It will be gone in #1512.
We can discuss, but I don't see any "very good" solutions to not set scheduleToStart. I guess keep workflow tasks open will be expected by users if they have some unexpected backfilling in their systems. An alternative reasonable solution may be to set it to localRetryThreshold. The second approach is safer from the load control, but may be not expected by users.
| private final PollActivityTaskQueueResponse.Builder activityTask; | ||
| public static final long NOT_SCHEDULED = -1; | ||
|
|
||
| // It doesn't have all the fields published yet. |
There was a problem hiding this comment.
What doesn't? This builder? Change to "This builder won't..."
| private static final RetryState LOCAL_RETRY_LIMIT_RETRY_STATE = | ||
| RetryState.RETRY_STATE_IN_PROGRESS; |
There was a problem hiding this comment.
It used in couple of places and I wanted to write a long comment on this value choice. And I don't want the comment to be duplicated.
| // Making sure that the result handling code following this statement is mutual exclusive | ||
| // with the start to close timeout handler. | ||
| boolean startToCloseTimeoutFired = | ||
| startToCloseTimeoutFuture != null && !startToCloseTimeoutFuture.cancel(false); | ||
|
|
There was a problem hiding this comment.
Since this is coming after handle, does this mean the start-to-close timeout cannot "interrupt" the running activity if it fires - but does it tell the workflow about the timeout right away?
So here we're just saying "OK we (elsewhere) told the WF the LA timed out, and now we're just discarding the result when the LA eventually finishes"?
Do we try to notify the LA of cancel / interrupt it at all? I'm guessing not since there's not a good mechanism for it
There was a problem hiding this comment.
Correct. RIGHT NOW there is no interruption or cancellations of local activities in java-sdk. We have a task for it #1303 and I guess it's coming at some moment.
| } | ||
|
|
||
| if (executionThrowable instanceof Error) { | ||
| // TODO Error inside Local Activity shouldn't be failing the local activity call. |
There was a problem hiding this comment.
Meant to be done in this PR?
There was a problem hiding this comment.
It's old existing behavior. It's incorrect, but it's not trivial to fix correctly. And directly relevant to the goal of this PR. I opened a separate task for it: #1533
| if (isRetryPolicyNotSet(activityTask)) { | ||
| executionContext.callback( | ||
| failed( | ||
| activityId, | ||
| RetryState.RETRY_STATE_RETRY_POLICY_NOT_SET, | ||
| executionFailure, | ||
| currentAttempt, | ||
| null)); | ||
| return; | ||
| } | ||
|
|
||
| RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(activityTask.getRetryPolicy()); | ||
|
|
||
| if (RetryOptionsUtils.isNotRetryable(retryOptions, executionThrowable)) { | ||
| executionContext.callback( | ||
| failed( | ||
| activityId, | ||
| RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, | ||
| executionFailure, | ||
| currentAttempt, | ||
| null)); | ||
| return; | ||
| } | ||
|
|
||
| if (RetryOptionsUtils.areAttemptsReached(retryOptions, currentAttempt)) { | ||
| executionContext.callback( | ||
| failed( | ||
| activityId, | ||
| RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, | ||
| executionFailure, | ||
| currentAttempt, | ||
| null)); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Could combine these all by just setting a retry state local var and then passing that into the failed call -- looks like the next few too
There was a problem hiding this comment.
Good eye, will refactor it.
| public void run() { | ||
| LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext(); |
There was a problem hiding this comment.
FYI: I realized yesterday Core isn't retrying start-to-close timeouts right now, we opened a bug to do it
| executionContext.callback( | ||
| failed( | ||
| activityId, | ||
| LOCAL_RETRY_LIMIT_RETRY_STATE, |
There was a problem hiding this comment.
Is this the state where we schedule a timer?
There was a problem hiding this comment.
Where we WILL schedule a timer in one of the upcoming PRs, yeah.
a8a4b72 to
1341bd3
Compare
1341bd3 to
562b191
Compare
What was changed
This PR implements a timer thread that independently of the local activity executors performs lifecycle management of local activities and allows to enforce timeouts.
The interface between LA workers and workflow code / state machines is reworked to allow LA workers to communicate different failure reasons / retry back to the workflow.
startToClosetimeout andscheduleToClosetimeout supports are implemented.The ground is prepared for adding scheduleToStart timeout for local activities #1512
The workflow/LA interface is improved to prepare for #1261
Why?
Previously Local Activities scheduling and management code were trivial.
One thread executes the activities, waits with sleep until the next attempt, and executes the next attempt.
While being simple, there is a bunch of Cons to this approach:
startToClosenorscheduleToClosetimeouts because the only thread that manages an activity invocation is tied to the execution of the activity's code.Closes #1004